Send commit concurrently in client side#59
Conversation
Codecov Report
@@ Coverage Diff @@
## master #59 +/- ##
============================================
- Coverage 55.21% 55.16% -0.05%
+ Complexity 1111 1110 -1
============================================
Files 148 148
Lines 7953 7962 +9
Branches 760 760
============================================
+ Hits 4391 4392 +1
- Misses 3321 3328 +7
- Partials 241 242 +1
Continue to review full report at Codecov.
|
|
Do you have performance tests? I guess this pr can't improve the performance. Because the performance bottleneck of |
|
Yes. I tested When using localfile mode, it cost 7.3 min. @jerqi |
As I know the spilling to disk event need to be triggered by client side. So if the previous trigger is blocked, the next one will |
|
We don't recommend to use the storageType |
| } | ||
|
|
||
| /** | ||
| * This method will wait until all shuffle data have been spilled |
Please put performance test results into |
|
|
||
| package org.apache.uniffle.client.util; | ||
|
|
||
| import org.apache.hadoop.io.OutputBuffer; |
| public ShuffleWriteClientImpl(String clientType, int retryMax, long retryIntervalMax, int heartBeatThreadNum, | ||
| int replica, int replicaWrite, int replicaRead, boolean replicaSkipEnabled, | ||
| int dataTranferPoolSize) { | ||
| int dataTranferPoolSize, int commitSenderPoolSize) { |
There was a problem hiding this comment.
We prefer the code style as below
public ShuffleWriteClientImpl(
String clientType,
int retryMax,
long retryIntervalMax,
int heartBeatThreadNum,
int replica,
int replicaWrite,
int replicaRead,
boolean replicaSkipEnabled,
int dataTranferPoolSize,
int commitSenderPoolSize) {
| public static final String RSS_COMMIT_SENDER_POOL_SIZE = | ||
| MR_RSS_CONFIG_PREFIX + RssClientConfig.RSS_COMMIT_SENDER_POOL_SIZE; | ||
| public static final int RSS_COMMIT_SENDER_POOL_SIZE_DEFAULT_VALUE = | ||
| RssClientConfig.RSS_COMMIT_SENDER_POOL_SIZE_DEFAULT_VALUE; |
There was a problem hiding this comment.
The name's style should be consistent with data_transfer_pool_size. How about data_commit_pool_size?
|
Could you update the document about this feature? |
|
4b5389f |
|
If we close the forkjoin pool in the scope of method. I think it’s ok. |
Ok |
The performance of |
client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
Outdated
Show resolved
Hide resolved
|
Besides I think i can submit new PR to let |
client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRUtils.java
Outdated
Show resolved
Hide resolved
client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
Outdated
Show resolved
Hide resolved
client/src/main/java/org/apache/uniffle/client/factory/ShuffleClientFactory.java
Outdated
Show resolved
Hide resolved
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
Outdated
Show resolved
Hide resolved
client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
Outdated
Show resolved
Hide resolved
We'd better have performance tests. |
| }); | ||
| }); | ||
| }).join(); | ||
| } catch (Exception e) { |
There was a problem hiding this comment.
Should we use
finally {
forkJoinPool.shutdownNow();
}
|
Could you update the document because this pr introduce the user-facing change? |
|
Done @jerqi |
What changes were proposed in this pull request?
Sending commit concurrently in client side
Why are the changes needed?
I found when using the
LOCALFILEstorageType, waiting the commit will cost too much time. To speed up, it can be sent commit concurrently by using thread pool.Performance Test Case
Using 1000 executors of Spark, single executor 1g/1core to run TeraSort 1TB.
When using
LOCALFILEstorageType mode, it cost 7.3 min.And then after applying this PR, it cost 6.1 min
Does this PR introduce any user-facing change?
rss.client.data.commit.pool.size, the default value is assigned shuffle server size.How was this patch tested?
No need